package com.crazymaker.cloud.disruptor.demo.business.impl;


import com.crazymaker.cloud.disruptor.demo.business.AsyncProducer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ResizableDisruptorProducer implements AsyncProducer {
    // 位运算 取模 轮询

    private AtomicInteger idx = new AtomicInteger();
    private AtomicInteger remainingCapacity = new AtomicInteger(1024);
    private Disruptor<LongEvent>[] executors;

    //一个translator可以看做一个事件初始化器，publicEvent方法会调用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.setValue(data);
                }
            };
    private final RingBuffer<LongEvent> ringBuffer;

    public ResizableDisruptorProducer() {
        executors = new Disruptor[1];
        executors[0] = disruptor();
        this.ringBuffer = executors[0].getRingBuffer();

    }

    public void publishData(Long data) {
        log.info("生产一个数据：" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());

        if (ringBuffer.remainingCapacity() <= 0) {
            throw new RuntimeException("队列满了，不能再生产，这里要快速失败");
        }
        if (executors.length == 1) {
            ringBuffer.publishEvent(TRANSLATOR, data);
        } else {
            next().publishEvent(TRANSLATOR, data);
        }

        remainingCapacity.set((int) ringBuffer.remainingCapacity());


    }

    public synchronized void resize() {
        int oldLength = executors.length;

        int newLength = oldLength * 2;

        Disruptor<LongEvent>[] newExecutors = new Disruptor[newLength];
        int index = 0;
        for (; index < oldLength; index++) newExecutors[index] = executors[index];
        for (; index < newLength; index++) newExecutors[index] = disruptor();

        executors = newExecutors;

    }

    public long remainingCapacity() {

        return ringBuffer.remainingCapacity();
    }

    public RingBuffer<LongEvent> next() {

        //  executors.length  为 2的幂次方
        //   假如 executors.length=8,  executors.length -1 =7 , 意味着后面的 3位全是 1， 二进制为 111
        //  executors.length= 16= 2^4 , 2^4-1 = 1111
        //  executors.length= 32= 2^5 , 2^5-1 = 11111

        // 高性能的取模
        return executors[idx.getAndIncrement() & executors.length - 1].getRingBuffer();
    }


    private Disruptor<LongEvent> disruptor() {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();

        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

        // 连接 消费者 处理器 ，两个消费者
        LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();
        LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();
        disruptor.handleEventsWith(handler1, handler2);
        disruptor.handleExceptionsFor(handler1).with(exceptionHandler);
        disruptor.handleExceptionsFor(handler2).with(exceptionHandler);


        // 开启 分裂者（事件分发）
        disruptor.start();

        return disruptor;
    }


    private MeterRegistry meterRegistry;

    public void setMeterRegistry(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        meterRegistry.gauge("disruptor_remaining_capacity", remainingCapacity);

    }

    ExceptionHandler exceptionHandler = new ExceptionHandler<LongEvent>() {
        @Override
        public void handleEventException(Throwable throwable, long l, LongEvent longEvent) {
            throwable.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable throwable) {
            log.info("Exception Start to Handle!");
        }

        @Override
        public void handleOnShutdownException(Throwable throwable) {
            log.info("Exception Handled!");
        }
    };
}

